Move service ingest construction into ingest core#2221
Conversation
Greptile SummaryThis PR extracts the service-mode ingest construction into a new
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/ingest/service.py | New module centralising service ingest construction; well-structured dataclasses and resolution logic, but _split_config_for_auto_documents silently produces an empty config when globs slip through unexpanded. |
| nemo_retriever/src/nemo_retriever/adapters/cli/ingest_workflow.py | Adds service dry-run serialisation and run_service_ingest_workflow; token redaction correctly catches service_api_token via endswith("token"). service_ingest_request_to_dry_run_data is unnecessarily public given it is only called by run_service_ingest_workflow within the same module. |
| nemo_retriever/src/nemo_retriever/adapters/cli/main.py | Wires service mode into root retriever ingest command; IngestorRunMode = Literal["inprocess","batch","service"] preserves Typer help-text choices correctly. Flag-rejection list (_ROOT_SERVICE_INCOMPATIBLE_FLAGS) is comprehensive. |
| nemo_retriever/src/nemo_retriever/pipeline/main.py | Bridges legacy pipeline run --run-mode service through new ingest.service module; removes ~75 lines of duplicated ingestor construction. The ValueError raised by build_service_ingestor for empty file matches propagates unhandled (flagged in an existing review thread). |
| nemo_retriever/tests/test_pipeline_helpers.py | Updated test_build_service_ingestor_wires_extract_embed_and_chunking to use the new ServiceIngestRequest + build_service_ingestor API; coverage adequate for the refactored helper. |
| nemo_retriever/tests/test_root_cli_workflow.py | Three new integration-level tests cover service mode execution, dry-run token redaction, and incompatible-flag rejection; test assertions are specific and well-scoped. |
Sequence Diagram
sequenceDiagram
participant CLI as retriever ingest CLI
participant Main as adapters/cli/main.py
participant IW as ingest_workflow.py
participant Svc as ingest/service.py
participant SI as ServiceIngestor
CLI->>Main: "ingest_command(run_mode="service", ...)"
Main->>Main: _reject_root_service_incompatible_flags(ctx)
Main->>Svc: resolve_service_ingest_request(ServiceIngestPlanRequest)
Svc->>Svc: _validate_service_input_type / profile
Svc->>Svc: resolve_service_documents (expand + validate)
Svc->>Svc: "_build_service_*_params"
Svc-->>Main: ServiceIngestRequest
Main->>IW: run_service_ingest_workflow(request)
alt dry_run
IW->>IW: _service_ingest_request_to_dry_run_data
IW-->>CLI: JSON payload (token redacted)
else execute
IW->>Svc: execute_service_ingest_request(request)
Svc->>Svc: build_service_ingestor(request)
Svc->>SI: ServiceIngestor(...).files(...).extract(...).embed(...)
SI-->>Svc: ingest result
Svc-->>IW: ServiceIngestExecutionResult
IW-->>CLI: summary dict
end
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
nemo_retriever/src/nemo_retriever/adapters/cli/ingest_workflow.py:97-111
`service_ingest_request_to_dry_run_data` is public (no leading underscore) while its counterpart `_ingest_plan_to_dry_run_data` is private, and this function is only called from `run_service_ingest_workflow` within the same module. The inconsistency invites accidental external use and leaks an internal formatting detail into the public API surface.
```suggestion
def _service_ingest_request_to_dry_run_data(request: ServiceIngestRequest) -> dict[str, Any]:
"""Return the JSON payload printed by ``retriever ingest --run-mode service --dry-run``."""
return {
"dry_run": True,
"run_mode": "service",
"documents": list(request.documents),
"input_type": request.input_type,
"service": _strip_secret_values(asdict(request.connection)),
"extract": _params_to_dry_run_dict(request.extract_params),
"split_config": _params_to_dry_run_dict(service_split_config_for_request(request)),
"dedup": _params_to_dry_run_dict(request.dedup_params),
"caption": _params_to_dry_run_dict(request.caption_params),
"embed": _params_to_dry_run_dict(request.embed_params),
"store": _params_to_dry_run_dict(request.store_params),
}
```
### Issue 2 of 2
nemo_retriever/src/nemo_retriever/ingest/service.py:1028-1047
**`_split_config_for_auto_documents` silently drops chunking when globs slip through unexpanded**
Any `document` for which `_glob.has_magic()` returns `True` is excluded from the `input_types` set. If every path still contains glob magic (e.g., the caller has not yet called `expand_service_file_patterns`), `input_types` is empty, `split_config` stays `{}`, and the function returns `None` — effectively disabling text-chunking with no error or warning. This path is not reachable through `build_service_ingestor` (which expands globs before calling `_attach_service_extract_stage`), but `service_split_config_for_request` calls this function with the raw `request.documents` that may still contain un-expanded patterns when consumed outside the standard resolution flow.
Reviews (5): Last reviewed commit: "Move service ingest construction into in..." | Re-trigger Greptile
| ingestor = ingest_service.build_service_ingestor( | ||
| ingest_service.ServiceIngestRequest( | ||
| documents=file_patterns, | ||
| input_type=input_type, | ||
| extract_params=extract_params, | ||
| embed_params=embed_params, | ||
| text_chunk_params=text_chunk_params, | ||
| enable_text_chunk=enable_text_chunk, | ||
| dedup_params=DedupParams(iou_threshold=dedup_iou_threshold) if enable_dedup else None, | ||
| caption_params=( | ||
| CaptionParams( | ||
| context_text_max_chars=caption_context_text_max_chars, | ||
| temperature=caption_temperature, | ||
| top_p=caption_top_p, | ||
| max_tokens=caption_max_tokens, | ||
| ) | ||
| if enable_caption | ||
| else None | ||
| ), | ||
| store_params=StoreParams(storage_uri=store_images_uri) if store_images_uri is not None else None, | ||
| connection=ingest_service.ServiceIngestConnectionOptions( | ||
| service_url=service_url, | ||
| service_concurrency=service_concurrency, | ||
| service_api_token=service_api_token, | ||
| ), | ||
| ) | ||
| ) |
There was a problem hiding this comment.
ValueError from empty file set is unhandled in the legacy pipeline path
The old _build_service_ingestor raised typer.BadParameter when no files matched the glob patterns; the replacement build_service_ingestor raises ValueError instead. The run() function here has only a try/finally (no except), so any ValueError propagates unhandled to Typer and produces a raw Python traceback rather than a clean error message. This regression is reachable when _resolve_file_patterns constructs a valid glob like dir/**/*.pdf but the directory is empty — the upstream helper validates path existence, not that the glob actually matches files.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/pipeline/__main__.py
Line: 1326-1352
Comment:
**`ValueError` from empty file set is unhandled in the legacy pipeline path**
The old `_build_service_ingestor` raised `typer.BadParameter` when no files matched the glob patterns; the replacement `build_service_ingestor` raises `ValueError` instead. The `run()` function here has only a `try/finally` (no `except`), so any `ValueError` propagates unhandled to Typer and produces a raw Python traceback rather than a clean error message. This regression is reachable when `_resolve_file_patterns` constructs a valid glob like `dir/**/*.pdf` but the directory is empty — the upstream helper validates path existence, not that the glob actually matches files.
How can I resolve this? If you propose a fix, please make it concise.| def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult: | ||
| result = build_service_ingestor(request).ingest() | ||
| result_n_rows = _count_service_result_rows(result) | ||
| return ServiceIngestExecutionResult( | ||
| request=request, | ||
| result=result, | ||
| n_rows=result_n_rows, | ||
| result_n_rows=result_n_rows, | ||
| metadata={ | ||
| "service_url": request.connection.service_url, | ||
| "input_type": request.input_type, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
No unit tests for the new
ingest/service.py module
nemo_retriever/ingest/service.py introduces substantial new business logic — profile-based extract defaults, document-type validation, caption/dedup/chunk param builders with their own error conditions, _split_config_for_auto_documents, and ServiceIngestExecutionResult — but no corresponding test_ingest_service.py exists. The new CLI-level tests exercise the wiring end-to-end, but functions like _build_service_caption_params (which raises on caption_context_text_max_chars < 0), _build_service_dedup_params (raises when iou_threshold is set without enabled), and _split_config_for_auto_documents (mixed media type branching) are untested in isolation. Per the test-mirrors-source-structure and test-coverage-new-code rules, a companion test module is required.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ingest/service.py
Line: 259-271
Comment:
**No unit tests for the new `ingest/service.py` module**
`nemo_retriever/ingest/service.py` introduces substantial new business logic — profile-based extract defaults, document-type validation, caption/dedup/chunk param builders with their own error conditions, `_split_config_for_auto_documents`, and `ServiceIngestExecutionResult` — but no corresponding `test_ingest_service.py` exists. The new CLI-level tests exercise the wiring end-to-end, but functions like `_build_service_caption_params` (which raises on `caption_context_text_max_chars < 0`), `_build_service_dedup_params` (raises when `iou_threshold` is set without `enabled`), and `_split_config_for_auto_documents` (mixed media type branching) are untested in isolation. Per the `test-mirrors-source-structure` and `test-coverage-new-code` rules, a companion test module is required.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| @dataclass(frozen=True) | ||
| class ServiceIngestExecutionResult: | ||
| request: ServiceIngestRequest | ||
| result: object | ||
| n_rows: int | None | ||
| result_n_rows: int | None | ||
| metadata: dict[str, Any] | ||
|
|
||
| @property | ||
| def documents(self) -> list[str]: | ||
| return list(self.request.documents) | ||
|
|
||
| @property | ||
| def service_url(self) -> str: | ||
| return self.request.connection.service_url | ||
|
|
||
| def to_summary_dict(self) -> dict[str, Any]: | ||
| return { | ||
| "run_mode": "service", | ||
| "documents": self.documents, | ||
| "service_url": self.service_url, | ||
| "result": self.result, | ||
| "n_rows": self.n_rows, | ||
| "result_n_rows": self.result_n_rows, | ||
| } |
There was a problem hiding this comment.
n_rows and result_n_rows are always identical, creating a confusing public API
In execute_service_ingest_request, both fields are set to the same value (_count_service_result_rows(result)) and both are emitted in to_summary_dict. If the intent was to distinguish "rows reported by the service" from "rows counted from the ingest result object", that distinction is not implemented — callers see two keys with the same number and no documented difference. The redundant field should either be removed, or the two fields should be sourced from different places with their distinction documented.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ingest/service.py
Line: 134-158
Comment:
**`n_rows` and `result_n_rows` are always identical, creating a confusing public API**
In `execute_service_ingest_request`, both fields are set to the same value (`_count_service_result_rows(result)`) and both are emitted in `to_summary_dict`. If the intent was to distinguish "rows reported by the service" from "rows counted from the ingest result object", that distinction is not implemented — callers see two keys with the same number and no documented difference. The redundant field should either be removed, or the two fields should be sourced from different places with their distinction documented.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| run_mode: str = typer.Option( | ||
| "inprocess", | ||
| "--run-mode", | ||
| help="Execution mode for the SDK ingestor. Defaults to inprocess; use batch for Ray Data scale-out.", | ||
| help=( | ||
| "Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, " | ||
| "or service for a remote retriever service." | ||
| ), | ||
| ), |
There was a problem hiding this comment.
run_mode changed from IngestRunModeValue to str, so Typer no longer renders valid choices in --help output (was [inprocess|batch], now just TEXT) and no longer performs automatic validation before the function body runs. Adding click.Choice preserves the Typer/Click help-text and validation ergonomics without sacrificing the new service value.
| run_mode: str = typer.Option( | |
| "inprocess", | |
| "--run-mode", | |
| help="Execution mode for the SDK ingestor. Defaults to inprocess; use batch for Ray Data scale-out.", | |
| help=( | |
| "Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, " | |
| "or service for a remote retriever service." | |
| ), | |
| ), | |
| run_mode: str = typer.Option( | |
| "inprocess", | |
| "--run-mode", | |
| click_type=click.Choice(["inprocess", "batch", "service"]), | |
| help=( | |
| "Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, " | |
| "or service for a remote retriever service." | |
| ), | |
| ), |
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/adapters/cli/main.py
Line: 200-207
Comment:
`run_mode` changed from `IngestRunModeValue` to `str`, so Typer no longer renders valid choices in `--help` output (was `[inprocess|batch]`, now just `TEXT`) and no longer performs automatic validation before the function body runs. Adding `click.Choice` preserves the Typer/Click help-text and validation ergonomics without sacrificing the new `service` value.
```suggestion
run_mode: str = typer.Option(
"inprocess",
"--run-mode",
click_type=click.Choice(["inprocess", "batch", "service"]),
help=(
"Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, "
"or service for a remote retriever service."
),
),
```
How can I resolve this? If you propose a fix, please make it concise.| def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult: | ||
| result = build_service_ingestor(request).ingest() |
There was a problem hiding this comment.
Public functions
execute_service_ingest_request, expand_service_file_patterns, and service_split_config_for_request lack docstrings. Per the docstrings-public-interface rule, all public functions must describe their behaviour, parameters, returns, and any exceptions raised.
| def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult: | |
| result = build_service_ingestor(request).ingest() | |
| def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult: | |
| """Execute a resolved service ingest request and return a structured result. | |
| Args: | |
| request: A fully-resolved ``ServiceIngestRequest`` produced by | |
| ``resolve_service_ingest_request`` or constructed directly. | |
| Returns: | |
| A ``ServiceIngestExecutionResult`` containing the raw ingest result, | |
| the row count (when the result exposes a ``dataframe`` attribute), and | |
| connection metadata. | |
| Raises: | |
| ValueError: If no files matched the input patterns in *request*. | |
| """ | |
| result = build_service_ingestor(request).ingest() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ingest/service.py
Line: 259-260
Comment:
Public functions `execute_service_ingest_request`, `expand_service_file_patterns`, and `service_split_config_for_request` lack docstrings. Per the `docstrings-public-interface` rule, all public functions must describe their behaviour, parameters, returns, and any exceptions raised.
```suggestion
def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult:
"""Execute a resolved service ingest request and return a structured result.
Args:
request: A fully-resolved ``ServiceIngestRequest`` produced by
``resolve_service_ingest_request`` or constructed directly.
Returns:
A ``ServiceIngestExecutionResult`` containing the raw ingest result,
the row count (when the result exposes a ``dataframe`` attribute), and
connection metadata.
Raises:
ValueError: If no files matched the input patterns in *request*.
"""
result = build_service_ingestor(request).ingest()
```
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
05bc509 to
e574193
Compare
Greptile Triage After
|
592b556 to
137c57a
Compare
Summary
nemo_retriever.ingest.serviceas the service ingest core for typed service ingest request/options, request resolution, service ingestor construction, and execution summaryretriever ingest --run-mode servicewith service URL/concurrency/token optionsretriever pipeline run --run-mode servicethrough the same service ingest core instead of keeping service construction inpipeline/__main__.pyMigration contract
retriever ingest+retriever queryare the forward path intended to replaceretriever pipeline runretriever ingest --run-mode servicecan ingest into service-mode systemsretriever querygains service support,retriever pipeline run --run-mode serviceremains available as a compatibility bridge for existing service workflowsretriever ingest --run-mode servicefollowed byretriever query --run-mode serviceWhat did not move
GraphIngestorexecution remain iningest.plan/ingest.executionValidation
py_compileon touched Python source filesretriever ingest --run-mode serviceingested jp20, 20 PDFs -> 3352 rows, service job completed 20/20 with 0 failures/v1/query: 115 queries, 0 empty-hit queries, recall@10=0.9652Notes
pre-commitwas not installed as a standalone command in the shell, but the repository commit hooks ran and passed duringgit commit --amend.pytestwas not available in the local venv, so focused pytest was not run here.